Anonlink Entity Service API¶
This tutorial demonstrates directly interacting with the entity service via the REST API. The primary alternative is to use a library or command line tool such as `anonlink-client
<https://anonlink-client.readthedocs.io/>`__ which can handle the communication with the anonlink entity service.
Dependencies¶
In this tutorial we interact with the REST API using the requests
Python library. Additionally we use the clkhash
Python library to define the linkage schema and to encode the PII. The synthetic dataset comes from the recordlinkage
package. All the dependencies can be installed with pip:
pip install requests clkhash recordlinkage
Steps¶
- Check connection to Anonlink Entity Service
- Synthetic Data generation and encoding
- Create a new linkage project
- Upload the encodings
- Create a run
- Retrieve and analyse results
[1]:
import json
import os
import time
import requests
from IPython.display import clear_output
Check Connection¶
If you are connecting to a custom entity service, change the address here.
[2]:
server = os.getenv("SERVER", "https://anonlink.easd.data61.xyz")
url = server + "/api/v1/"
print(f'Testing anonlink-entity-service hosted at {url}')
Testing anonlink-entity-service hosted at https://anonlink.easd.data61.xyz/api/v1/
[3]:
requests.get(url + 'status').json()
[3]:
{'project_count': 2, 'rate': 9129125, 'status': 'ok'}
Data preparation¶
This section won’t be explained in great detail as it directly follows the clkhash tutorials.
We encode a synthetic dataset from the recordlinkage
library using clkhash
.
[4]:
from tempfile import NamedTemporaryFile
from recordlinkage.datasets import load_febrl4
[5]:
dfA, dfB = load_febrl4()
[6]:
with open('a.csv', 'w') as a_csv:
dfA.to_csv(a_csv, line_terminator='\n')
with open('b.csv', 'w') as b_csv:
dfB.to_csv(b_csv, line_terminator='\n')
Schema Preparation¶
The linkage schema must be agreed on by the two parties. A hashing schema instructs clkhash
how to treat each column for encoding PII into CLKs. A detailed description of the hashing schema can be found in the clkhash documentation.
A linkage schema can either be defined as Python code as shown here, or as a JSON file (shown in other tutorials). The importance of each field is controlled by the k
parameter in the FieldHashingProperties
. We ignore the record id and social security id fields so they won’t be incorporated into the encoding.
[7]:
import clkhash
from clkhash.comparators import *
from clkhash.field_formats import *
schema = clkhash.randomnames.NameList.SCHEMA
_missing = MissingValueSpec(sentinel='')
schema.fields = [
Ignore('rec_id'),
StringSpec('given_name',
FieldHashingProperties(
NgramComparison(2),
BitsPerTokenStrategy(15))),
StringSpec('surname',
FieldHashingProperties(
NgramComparison(2),
BitsPerTokenStrategy(15))),
IntegerSpec('street_number',
FieldHashingProperties(
NgramComparison(1, positional=True),
BitsPerTokenStrategy(15),
missing_value=_missing)),
StringSpec('address_1',
FieldHashingProperties(
NgramComparison(2),
BitsPerTokenStrategy(15))),
StringSpec('address_2',
FieldHashingProperties(
NgramComparison(2),
BitsPerTokenStrategy(15))),
StringSpec('suburb',
FieldHashingProperties(
NgramComparison(2),
BitsPerTokenStrategy(15))),
IntegerSpec('postcode',
FieldHashingProperties(
NgramComparison(1, positional=True),
BitsPerTokenStrategy(15))),
StringSpec('state',
FieldHashingProperties(
NgramComparison(2),
BitsPerTokenStrategy(15))),
IntegerSpec('date_of_birth',
FieldHashingProperties(
NgramComparison(1, positional=True),
BitsPerTokenStrategy(15),
missing_value=_missing)),
Ignore('soc_sec_id')
]
Encoding¶
Transforming the raw personally identity information into CLK encodings following the defined schema. See the clkhash documentation for further details on this.
[8]:
from clkhash import clk
with open('a.csv') as a_pii:
hashed_data_a = clk.generate_clk_from_csv(a_pii, 'secret', schema, validate=False)
with open('clks_a.json', 'w') as f:
json.dump({'clks': hashed_data_a}, f)
with open('b.csv') as b_pii:
hashed_data_b = clk.generate_clk_from_csv(b_pii, 'secret', schema, validate=False)
with open('clks_b.json', 'w') as f:
json.dump({'clks': hashed_data_b}, f)
generating CLKs: 100%|██████████| 5.00k/5.00k [00:01<00:00, 2.84kclk/s, mean=643, std=45.7]
generating CLKs: 100%|██████████| 5.00k/5.00k [00:01<00:00, 3.16kclk/s, mean=631, std=52.9]
Create Linkage Project¶
The analyst carrying out the linkage starts by creating a linkage project of the desired output type with the Entity Service.
[9]:
project_spec = {
"schema": {},
"result_type": "groups",
"number_parties": 2,
"name": "API Tutorial Test"
}
credentials = requests.post(url + 'projects', json=project_spec).json()
project_id = credentials['project_id']
a_token, b_token = credentials['update_tokens']
credentials
[9]:
{'project_id': '277ecc511e18dc10382b77f711dfbcf57f028405d57e650c',
'result_token': '7878f100f21bfb041f2b7411ac8eba7246fd43f08586c8c7',
'update_tokens': ['81984628f5e0178cccfae34207581836f8cc1e39f92f9532',
'3b13b1528d084baa51c2d25f715b3701cccb3f292c561eb9']}
The server returns a project_id
, a result_token
and a set of update_tokens
, one for each data provider. - The project_id
references the project uniquely on the server. - The result_token
authorises project API requests, i.e., access to the result of the linkage. - The update_tokens
authorise the data upload. There is one update_token
for each data provider, and each token can only be used once.
Note: the analyst will need to pass on the project_id
(the id of the linkage project) and one of the update_tokens
to each data provider.
The result_token
can also be used to carry out project API requests:
[10]:
requests.get(url + 'projects/{}'.format(project_id),
headers={"Authorization": credentials['result_token']}).json()
[10]:
{'error': False,
'name': 'API Tutorial Test',
'notes': '',
'number_parties': 2,
'parties_contributed': 0,
'project_id': '277ecc511e18dc10382b77f711dfbcf57f028405d57e650c',
'result_type': 'groups',
'schema': {},
'uses_blocking': False}
Now the two clients can upload their data providing the appropriate upload tokens.
CLK Upload¶
there are currently two different ways of uploading CLKs to the entity server.
Method 1: Direct Upload¶
The ‘clks’ endpoint accepts CLKs in both json and binary format. However, this method is not recommended for large datasets, as uploads can not be resumed and might time out.
[ ]:
a_response = requests.post(
'{}projects/{}/clks'.format(url, project_id),
json={'clks': hashed_data_a},
headers={"Authorization": a_token}
).json()
[ ]:
b_response = requests.post(
'{}projects/{}/clks'.format(url, project_id),
json={'clks': hashed_data_b},
headers={"Authorization": b_token}
).json()
Method 2. Upload to object store.¶
The entity service can be deployed with an object store. This object store can be used by the data providers to upload their CLKs. First, the data provider have to request a set of temporary credentials which authorise the upload to the object store. The returned Temporary Object Store Credentials can be used with any S3 compatible client. For example by using boto3 in Python. The returned credentials are restricted to allow only uploading data to a particular path in a particular bucket for a finite period (defaulting to 12 hours). After the client uploaded the data, he informs the entity service.
Note this feature may be disabled by the administrator, in this case the endpoint will return a 500 server error.
[11]:
from minio import Minio
upload_response = requests.get(
url + 'projects/{}/authorize-external-upload'.format(project_id),
headers={'Authorization': a_token},
).json()
upload_credentials = upload_response['credentials']
upload_info = upload_response['upload']
# Use Minio python client to upload data
mc = Minio(
upload_info['endpoint'],
access_key=upload_credentials['AccessKeyId'],
secret_key=upload_credentials['SecretAccessKey'],
session_token=upload_credentials['SessionToken'],
region='us-east-1',
secure=upload_info['secure']
)
etag = mc.fput_object(
upload_info['bucket'],
upload_info['path'] + "/clks_a.json",
'clks_a.json',
metadata={
"hash-count": 5000,
"hash-size": 128
})
# Should be able to notify the service that we've uploaded data
res = requests.post(url + f"projects/{project_id}/clks",
headers={'Authorization': a_token},
json={
'encodings': {
'file': {
'bucket': upload_info['bucket'],
'path': upload_info['path'] + "/clks_a.json",
}
}
})
print(f'Upload party A: {"OK" if res.status_code == 201 else "ERROR"}')
#party B:
upload_response = requests.get(
url + 'projects/{}/authorize-external-upload'.format(project_id),
headers={'Authorization': b_token},
).json()
upload_credentials = upload_response['credentials']
upload_info = upload_response['upload']
# Use Minio python client to upload data
mc = Minio(
upload_info['endpoint'],
access_key=upload_credentials['AccessKeyId'],
secret_key=upload_credentials['SecretAccessKey'],
session_token=upload_credentials['SessionToken'],
region='us-east-1',
secure=upload_info['secure']
)
etag = mc.fput_object(
upload_info['bucket'],
upload_info['path'] + "/clks_b.json",
'clks_b.json',
metadata={
"hash-count": 5000,
"hash-size": 128
})
# Should be able to notify the service that we've uploaded data
res = requests.post(url + f"projects/{project_id}/clks",
headers={'Authorization': b_token},
json={
'encodings': {
'file': {
'bucket': upload_info['bucket'],
'path': upload_info['path'] + "/clks_b.json",
}
}
})
print(f'Upload party B: {"OK" if res.status_code == 201 else "ERROR"}')
Upload party A: OK
Upload party B: OK
Every upload gets a receipt token. In some operating modes this receipt is required to access the results.
Create a run¶
Now the project has been created and the CLK encodings have been uploaded we can carry out some privacy preserving record linkage. The same encoding data can be linked using different threshold values by creating runs.
[12]:
run_response = requests.post(
"{}projects/{}/runs".format(url, project_id),
headers={"Authorization": credentials['result_token']},
json={
'threshold': 0.80,
'name': "Tutorial Run #1"
}
).json()
[13]:
run_id = run_response['run_id']
Run Status¶
[14]:
requests.get(
'{}projects/{}/runs/{}/status'.format(url, project_id, run_id),
headers={"Authorization": credentials['result_token']}
).json()
[14]:
{'current_stage': {'description': 'waiting for CLKs',
'number': 1,
'progress': {'absolute': 2,
'description': 'number of parties already contributed',
'relative': 1.0}},
'stages': 3,
'state': 'created',
'time_added': '2020-06-15T05:41:44.177808',
'time_started': None}
Results¶
Now after some delay (depending on the size) we can fetch the results. This can of course be done by directly polling the REST API using requests
, however for simplicity we will just use the watch_run_status function provided in anonlinkclient.rest_client
.
Note theserver
is provided rather thanurl
.
[15]:
from anonlinkclient.rest_client import RestClient, format_run_status
rest_client = RestClient(server)
for update in rest_client.watch_run_status(project_id, run_id, credentials['result_token'], timeout=300):
clear_output(wait=True)
print(format_run_status(update))
State: completed
Stage (3/3): compute output
[16]:
data = json.loads(rest_client.run_get_result_text(
project_id,
run_id,
credentials['result_token']))
This result is the 1-1 mapping between rows that were more similar than the given threshold.
[17]:
for i in range(10):
((_, a_index), (_, b_index)) = sorted(data['groups'][i])
print("a[{}] maps to b[{}]".format(a_index, b_index))
print("...")
a[174] maps to b[4485]
a[2570] maps to b[3737]
a[1920] maps to b[4157]
a[420] maps to b[4323]
a[136] maps to b[1416]
a[3090] maps to b[4797]
a[2940] maps to b[3663]
a[2228] maps to b[1095]
a[2623] maps to b[3447]
a[672] maps to b[2795]
...
In this dataset there are 5000 records in common. With the chosen threshold and schema we currently retrieve:
[18]:
len(data['groups'])
[18]:
4842
Cleanup¶
If you want you can delete the run and project from the anonlink-entity-service.
[19]:
requests.delete("{}/projects/{}".format(url, project_id), headers={"Authorization": credentials['result_token']})
[19]:
<Response [204]>
[ ]: